package tableapi

import bean.SensorReading
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}

/**
  * @Description: TODO QQ1667847363
  * @author: xiao kun tai
  * @date:2021/11/27 12:14
  */
object Table2_Api {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //file source
    val inputPath: String = "src/main/resources/sensor.txt"
    val fileStream: DataStream[String] = env.readTextFile(inputPath)

    val socketStream = env.socketTextStream("192.168.88.106", 7777)

    //先转换为特定的类型
    val dataStream: DataStream[SensorReading] = fileStream.map(data => {
      val arr = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    //创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env)

    //基于老版本planner的流处理
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useOldPlanner()
      .inStreamingMode()
      .build()
    val oldStreamTablEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

    //基于老版本planner的批处理
    val batchEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val oldBatchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv)

    //基于blink planner的流处理
    val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val blinkStreamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, blinkStreamSettings)

    //基于blink planner的批处理
    val blinkBatchSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inBatchMode()
      .build()
    val blinkBatchTableEnv: TableEnvironment = TableEnvironment.create(blinkBatchSettings)


    env.execute("table api test")
  }


}
